import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;




public class LeftOuterJoinwithTableFunction
{
    public static void main(String[] args) throws Exception
    {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        DataSet<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer#apple#jeep", 3),
                new Order(3L, "rubber#apple#car", 2),
                new Order(1L, "diaper#rubber#jeep", 4)
        ));


        tEnv.createTemporaryView("Orders", orderA,$("user"),$("product"),$("amount"));


        // register User-Defined Table Function
        TableFunction<Tuple3<String,String,String>> split = new MySplitUDTF();
        tEnv.registerFunction("split", split);

        // join
        Table orders = tEnv.from("Orders");

//    Table和Function的返回结果进行Join操作:
        Table result = orders
                .leftOuterJoinLateral(call("split", $("product")).as("product1","product2","product3"))
                .select($("user"), $("amount"), $("product1"),$("product2"),$("product3"));

        tEnv.toDataSet(result, Row.class).print();


    }

}
